[SPARK-56651][CONNECT][SDP] Add Python APIs for Auto CDC SCD Type 1#56045
[SPARK-56651][CONNECT][SDP] Add Python APIs for Auto CDC SCD Type 1#56045anew wants to merge 3 commits into
Conversation
- Remove spaces around = in keyword arguments (PEP 8) - Fix type hint: List[Union[str, Column]] -> Union[List[str], List[Column]] - Reorder imports and collapse unnecessary line continuations Co-authored-by: Isaac
AnishMahto
left a comment
There was a problem hiding this comment.
Only real comment is to drop ignore null API for now. LGTM.
| assert sink_obj.source_code_location.filename.endswith("test_graph_element_registry.py") | ||
|
|
||
| def test_create_auto_cdc_flow(self): | ||
| from pyspark.sql.connect.functions.builtin import col, expr |
There was a problem hiding this comment.
non-blocking nit: can just lift imports out of each individual test
| self.assertEqual(sink_obj.options["key1"], "value1") | ||
| assert sink_obj.source_code_location.filename.endswith("test_graph_element_registry.py") | ||
|
|
||
| def test_create_auto_cdc_flow(self): |
There was a problem hiding this comment.
non-blocking nit: This test can just be collapsed with test_create_auto_cdc_flow_with_all_args
There was a problem hiding this comment.
I guess this tests the behavior with minimal required arguments. But I'd say it should validate that the defaults are correct. Adding that.
| ignore_null_updates_column_list: Optional[Union[List[str], List[Column]]] = None, | ||
| ignore_null_updates_except_column_list: Optional[Union[List[str], List[Column]]] = None, |
There was a problem hiding this comment.
Let's add these API later when ignore null execution support is actually built.
There was a problem hiding this comment.
are we not building that?
There was a problem hiding this comment.
We will eventually (hopefully soon!), but I'm generally in favor of only adding the API once the feature is built. Otherwise we'll just be throwing a not support exception anyway if the user tries specifying an ignore null column selection.
| keys: Union[List[str], List[Column]], | ||
| sequence_by: Union[str, Column], | ||
| apply_as_deletes: Optional[Union[str, Column]] = None, | ||
| apply_as_truncates: Optional[Union[str, Column]] = None, |
There was a problem hiding this comment.
Just a heads up, there's a good chance we're not going to get apply_as_truncates functionality merged in for the 4.2 cut.
I'll most likely drop this argument when I connect these APIs to the graph registration context on the spark connect backend, and then add it back for spark 4.3+.
No action needed on your side, just giving the heads up.
- Move inline imports to module level - Fix assertNone -> assertIsNone - Fix assertEqual(stored_as_scd_type, "1") -> assertIsNone for default case - Add missing assertions for optional fields in test_create_auto_cdc_flow Co-authored-by: Isaac
szehon-ho
left a comment
There was a problem hiding this comment.
A few nits on validation, tests, imports, and docstring casing.
| ) | ||
| ignore_null_updates_except_column_list = _normalize_optional_column_list( | ||
| ignore_null_updates_except_column_list | ||
| ) |
There was a problem hiding this comment.
Doc says mutual exclusion and non-empty keys, but nothing enforces it. Validate after normalization (like other SDP APIs) so users get a clear client error.
There was a problem hiding this comment.
Lets leave validation on the engine side (i.e when we construct/analyze the flow objects), so that validation is centralized for all APIs (SQL, Python, and in the future any other spark-connect stub).
Ex. on the engine-side we will have to construct ChangeArgs from the protobuf request, which already validates keys must be non-empty.
| return _normalize_column_list(column_list) | ||
|
|
||
|
|
||
| def _normalize_column_list( |
There was a problem hiding this comment.
Add tests for string args (keys=["id"], sequence_by="ts", etc.), not only Connect col/expr.
| from pyspark.sql.connect.dataframe import DataFrame as ConnectDataFrame | ||
| from pyspark.sql.connect.types import pyspark_types_to_proto_types | ||
| from pyspark.sql.types import StructType | ||
| from pyspark.pipelines.add_pipeline_analysis_context import add_pipeline_analysis_context |
There was a problem hiding this comment.
Nit: import shuffle only — consider keeping prior order to shrink diff.
| table. These keys also identify records in the target table, e.g., if there exists a record \ | ||
| for given keys and the CDC source has an UPSERT operation for the same keys, we will update \ | ||
| the existing record. At least one key must be provided. This should be a list of column \ | ||
| identifiers without qualifiers, expressed as either Python strings or Pyspark Columns. |
There was a problem hiding this comment.
Nit: Pyspark → PySpark in this docstring (573, 575, 581-585).
Per AnishMahto's heads-up on apache#56045, apply_as_truncates is unlikely to land for the 4.2 cut. Following the same principle applied to ignore_null_updates_*, we drop the parameter from the user-facing Python API now and re-add it once execution support is in. The proto field stays so the server-side wiring is untouched.
Per AnishMahto's heads-up on apache#56045, apply_as_truncates is unlikely to land for the 4.2 cut. Following the same principle applied to ignore_null_updates_*, we drop the parameter from the user-facing Python API now and re-add it once execution support is in. The proto field stays so the server-side wiring is untouched.
## Takeover of #56045. PR description is copied. ### What changes were proposed in this pull request? Adds `create_auto_cdc_flow` to the the SDP Python API. For now, this will only support SCD Type 1. Parameters: - name: the name of the flow - target: the target table - source: the source dataset with the change events - keys: the unique key per row, - sequence_by: a sequence id to establish time order - apply_as_deletes: a boolean expression indicating whether an event represents a delete - ~~apply_as_truncates: a boolean expression indicating whether an event represents a truncation~~ - column_list: a list of columns to include in the target table - except_column_list: a list of columns to exclude from the target table - stored_as_scd_type the SCD type, must be 1 - ~~ignore_null_updates_column_list: a list of columns for which to ignore null values~~ - ~~ignore_null_updates_except_column_list: a list of columns for which not to ignore null values~~ - source_code_location: the location in the Python source code that defines this flow This PR introduces the PySpark API to register an AutoCDC flow within an SDP, and send the registration requests to the Spark driver via Spark Connect protos. This PR does not actually handle the reception of said Spark Connect protos, and the pipelines handler in the Spark driver will simply throw some form of an operation unsupported/unrecognized error. ### Why are the changes needed? See the SPIP at https://docs.google.com/document/d/1Hp5BGEYJRHbk6J7XUph3bAPZKRQXKOuV1PEaqZMMRoQ/ ### Does this PR introduce _any_ user-facing change? Yes, it introduces a new method in the SDP Python API. ### How was this patch tested? Unit tests were added, using a local graph registry. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Sonnet 4.6 Closes #56069 from AnishMahto/SPARK-56651-autocdc-python-api. Lead-authored-by: AnishMahto <anish.mahto99@gmail.com> Co-authored-by: andreas-neumann_data <andreas.neumann@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## Takeover of #56045. PR description is copied. ### What changes were proposed in this pull request? Adds `create_auto_cdc_flow` to the the SDP Python API. For now, this will only support SCD Type 1. Parameters: - name: the name of the flow - target: the target table - source: the source dataset with the change events - keys: the unique key per row, - sequence_by: a sequence id to establish time order - apply_as_deletes: a boolean expression indicating whether an event represents a delete - ~~apply_as_truncates: a boolean expression indicating whether an event represents a truncation~~ - column_list: a list of columns to include in the target table - except_column_list: a list of columns to exclude from the target table - stored_as_scd_type the SCD type, must be 1 - ~~ignore_null_updates_column_list: a list of columns for which to ignore null values~~ - ~~ignore_null_updates_except_column_list: a list of columns for which not to ignore null values~~ - source_code_location: the location in the Python source code that defines this flow This PR introduces the PySpark API to register an AutoCDC flow within an SDP, and send the registration requests to the Spark driver via Spark Connect protos. This PR does not actually handle the reception of said Spark Connect protos, and the pipelines handler in the Spark driver will simply throw some form of an operation unsupported/unrecognized error. ### Why are the changes needed? See the SPIP at https://docs.google.com/document/d/1Hp5BGEYJRHbk6J7XUph3bAPZKRQXKOuV1PEaqZMMRoQ/ ### Does this PR introduce _any_ user-facing change? Yes, it introduces a new method in the SDP Python API. ### How was this patch tested? Unit tests were added, using a local graph registry. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Sonnet 4.6 Closes #56069 from AnishMahto/SPARK-56651-autocdc-python-api. Lead-authored-by: AnishMahto <anish.mahto99@gmail.com> Co-authored-by: andreas-neumann_data <andreas.neumann@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit a100c0b) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## Takeover of #56045. PR description is copied. ### What changes were proposed in this pull request? Adds `create_auto_cdc_flow` to the the SDP Python API. For now, this will only support SCD Type 1. Parameters: - name: the name of the flow - target: the target table - source: the source dataset with the change events - keys: the unique key per row, - sequence_by: a sequence id to establish time order - apply_as_deletes: a boolean expression indicating whether an event represents a delete - ~~apply_as_truncates: a boolean expression indicating whether an event represents a truncation~~ - column_list: a list of columns to include in the target table - except_column_list: a list of columns to exclude from the target table - stored_as_scd_type the SCD type, must be 1 - ~~ignore_null_updates_column_list: a list of columns for which to ignore null values~~ - ~~ignore_null_updates_except_column_list: a list of columns for which not to ignore null values~~ - source_code_location: the location in the Python source code that defines this flow This PR introduces the PySpark API to register an AutoCDC flow within an SDP, and send the registration requests to the Spark driver via Spark Connect protos. This PR does not actually handle the reception of said Spark Connect protos, and the pipelines handler in the Spark driver will simply throw some form of an operation unsupported/unrecognized error. ### Why are the changes needed? See the SPIP at https://docs.google.com/document/d/1Hp5BGEYJRHbk6J7XUph3bAPZKRQXKOuV1PEaqZMMRoQ/ ### Does this PR introduce _any_ user-facing change? Yes, it introduces a new method in the SDP Python API. ### How was this patch tested? Unit tests were added, using a local graph registry. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Sonnet 4.6 Closes #56069 from AnishMahto/SPARK-56651-autocdc-python-api. Lead-authored-by: AnishMahto <anish.mahto99@gmail.com> Co-authored-by: andreas-neumann_data <andreas.neumann@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit a100c0b) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Adds
create_auto_cdc_flowto the the SDP Python API. For now, this will only support SCD Type 1. Parameters:Why are the changes needed?
See the SPIP at https://docs.google.com/document/d/1Hp5BGEYJRHbk6J7XUph3bAPZKRQXKOuV1PEaqZMMRoQ/
Does this PR introduce any user-facing change?
Yes, it introduces a new method in the SDP Python API.
How was this patch tested?
Unit tests were added, using a local graph registry.
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Sonnet 4.6